package com.shiku.imserver.service;

import com.shiku.commons.task.TimerTask;
import com.shiku.imserver.CoreService;
import com.shiku.imserver.common.IService;
import com.shiku.imserver.common.message.AbstractMessage;
import com.shiku.imserver.common.message.ChatMessage;
import com.shiku.imserver.common.packets.ImPacket;
import com.shiku.imserver.common.utils.StringUtils;
import com.shiku.imserver.message.MessageFactory;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.redisson.api.RList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;

public class ReceiptLogicService implements IService {
  private static Logger log = LoggerFactory.getLogger(CoreService.class);
  
  private static long ACKTIMEOUT = 15000L;
  
  private static long RESEND_COUNT = 5L;
  
  private static final String MESSAGE_MAP = "message_map";
  
  private static final String OUT_WAIT_TASK = "waitack_task";
  
  private ScheduledExecutorService ecutorScheduler;
  
  public boolean initialize() {
    this.ecutorScheduler = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
    return true;
  }

  /**
   * 发出去消息，等待客户端回传回执
   * @param channelContext
   * @param message
   */
  public void willMessageReceipt(ChannelContext channelContext, ChatMessage message) {
    if (null == message)
      return;

	Map<String, Entry> messageMap = (Map<String, Entry>)channelContext.getAttribute("message_map");
	if (null == messageMap) {
      messageMap = new ConcurrentHashMap<>();
      SkAckTimeoutTask ackTimeoutTask = new SkAckTimeoutTask(channelContext);
      //启动定时器
      this.ecutorScheduler.scheduleAtFixedRate((Runnable)ackTimeoutTask, 10L, 10L, TimeUnit.SECONDS);
      channelContext.setAttribute("message_map", messageMap);
    } 
    Entry entry = messageMap.get(message.getMessageHead().getMessageId());
    if (null == entry) {
      entry = new Entry(message);
      messageMap.put(message.getMessageHead().getMessageId(), entry);
    } else {
       //增加重试次数
    	entry.addReCount();
       //entry.refreshStamp();
    }

  }
  
  public void removeMessage(ChannelContext channelContext, String messageId, String roomJid) {
    String[] messageIds = messageId.split(",");
    for (String msgId : messageIds) {
      if (!StringUtils.isEmpty(msgId))
      {
        IMBeanUtils.getRedisService().removeWillMessage(Integer.valueOf(channelContext.userid),msgId);
      }
    }
    Map<String, Entry> messageMap = (Map<String, Entry>)channelContext.getAttribute("message_map");
    if (null == messageMap)
      return; 
    for (String msgId : messageIds) {
      if (!StringUtils.isEmpty(msgId))
      {
        messageMap.remove(msgId);
      }
    }
  }

  /**
   * 用户登出
   * @param channelContext
   */
  public void logout(ChannelContext channelContext) {
    SkAckTimeoutTask ackTimeTask = (SkAckTimeoutTask)channelContext.getAttribute("waitack_task");
    Map<String, Entry> messageMap = (Map<String, Entry>)channelContext.getAttribute("message_map");

    //同步resid 和 messageMap
    RList<ChatMessage> _willList = IMBeanUtils.getRedisService().getWillMessage(Integer.valueOf(channelContext.userid));
    for (ChatMessage chatMessage : _willList)
    {
      if(messageMap == null)
      {
          messageMap = new ConcurrentHashMap<>();
      }
      Entry entry2 = messageMap.get(chatMessage.getMessageHead().getMessageId());
      if (null == entry2) {
        entry2 = new Entry(chatMessage);
        messageMap.put(chatMessage.getMessageHead().getMessageId(), entry2);
      }
    }
    _willList.clear();
    if (null != messageMap) {
      messageMap.values().forEach(entry -> {
            if (1 == entry.getMessage().getMessageHead().getChatType())
                IMBeanUtils.getMessageRepository().storeChatOffMessage(entry.getMessage());
          });
      messageMap.clear();
    } 
    if (null != ackTimeTask)
      ackTimeTask.cancel(); 
  }
  
  public boolean startupAfter() {
    return false;
  }
  
  private class SkAckTimeoutTask extends TimerTask {
    private ChannelContext channelContext;
    
    public SkAckTimeoutTask(ChannelContext channelContext) {
      this.channelContext = channelContext;
    }
    
    public void run() {
      if (null == this.channelContext )
        return; 
      @SuppressWarnings("unchecked")
	Map<String, ReceiptLogicService.Entry> messageMap = (Map<String, ReceiptLogicService.Entry>)this.channelContext.getAttribute("message_map");
      if (null == messageMap)
        return; 
      long cuTime = System.currentTimeMillis();
      ReceiptLogicService.Entry entry = null;
      ChatMessage message = null;
      Iterator<ReceiptLogicService.Entry> iterator = messageMap.values().iterator();
      String messageId = null;
      while (iterator.hasNext()) {
        entry = iterator.next();
        messageId = entry.getMessage().getMessageHead().getMessageId();
        message = entry.getMessage();
        //等15秒
        if (cuTime - entry.getStamp() < ReceiptLogicService.ACKTIMEOUT)
          continue; 
        
        //重试次数未超限
        if (ReceiptLogicService.RESEND_COUNT > entry.getReCount()) {
          messageMap.remove(messageId);
          IMBeanUtils.getRedisService().removeWillMessage(Integer.valueOf(this.channelContext.userid),messageId);
          if (!this.channelContext.isClosed) {
            ImPacket packet = MessageFactory.convertToImPacket((AbstractMessage)message);
            packet.setMessage(message);
            CoreService.send(this.channelContext, packet);
            continue;
          } 
          if (1 == message.getMessageHead().getChatType())
          {
        	  IMBeanUtils.getMessageRepository().storeChatOffMessage(message); 
          }
          continue;
        } 
        ReceiptLogicService.log.info(" {}  5   {}", this.channelContext.userid, message
            .getMessageHead().getMessageId());
        messageMap.remove(messageId);
        IMBeanUtils.getRedisService().removeWillMessage(Integer.valueOf(this.channelContext.userid),messageId);
        if (1 == message.getMessageHead().getChatType())
          IMBeanUtils.getMessageRepository().storeChatOffMessage(message); 
      }
    }
  }
  
  private class Entry {
    private ChatMessage message;
    
    private long stamp = System.currentTimeMillis();
    
    private int reCount = 0;
    
    public Entry(ChatMessage message) {
      this.message = message;
    }
    
    public long getStamp() {
      return this.stamp;
    }
    
    public void refreshStamp() {
      this.stamp = System.currentTimeMillis();
    }
    
    public void addReCount() {
      this.reCount++;
      refreshStamp();
    }
    
    public int getReCount() {
      return this.reCount;
    }
    
    public ChatMessage getMessage() {
      return this.message;
    }
  }
}
